package com.coalmine.iec104.modules.handle;

import com.coalmine.iec104.config.constant.IecClientInfo;
import com.coalmine.iec104.config.constant.IecConfig;
import com.coalmine.iec104.modules.domain.ReportDate;
import com.coalmine.iec104.modules.domain.UploadMsgInfo;
import com.coalmine.iec104.modules.service.ReportDataService;
import com.coalmine.iec104.modules.util.StringUtils;
import com.coalmine.iec104.modules.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.openmuc.j60870.*;
import org.openmuc.j60870.ie.IeQualifierOfInterrogation;
import org.openmuc.j60870.ie.InformationElement;
import org.openmuc.j60870.ie.InformationObject;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @Author mkq
 * @Date 2023-11-06 10:59
 * @Description 数据采集客户端 连接服务后发送总召,获取服务端数据
 */
@Slf4j
@Component
@ConditionalOnProperty(name = "iec104.useCollectClient", havingValue = "true")
public class IEC104CollectHandle {

    @Resource
    private IecConfig iecConfig;

    @Resource
    private DelayMsgImpl delayMsgImpl;

    @Resource
    private ReportDataService reportDataService;

    private Map<IecClientInfo, Boolean> clientInfoMap = new ConcurrentHashMap<>();

    private Map<IecClientInfo, Connection> clientConnections = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        List<IecClientInfo> collectClient = iecConfig.getCollectClient();
        if (CollectionUtils.isNotEmpty(collectClient)) {
            for (IecClientInfo iecClientInfo : collectClient) {
                clientInfoMap.put(iecClientInfo, false);
                buildIec104Client(iecClientInfo);
            }
        }
    }

    private void buildIec104Client(IecClientInfo iecClientInfo) {
        iniClient(iecClientInfo);
        // 定时发送总召信息
        generalInterrogation(iecClientInfo);
    }

    private void generalInterrogation(IecClientInfo iecClientInfo) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
        threadPool.scheduleAtFixedRate(() -> {
            sendInterrogation(iecClientInfo);
        }, 10, iecConfig.getUploadRate(), TimeUnit.SECONDS);
    }

    private void sendInterrogation(IecClientInfo iecClientInfo) {
        Connection connection = clientConnections.get(iecClientInfo);
        if (StringUtils.isNotNull(connection) && !connection.isClosed()) {
            try {
                connection.interrogation(iecClientInfo.getCommonAddress(), CauseOfTransmission.ACTIVATION, new IeQualifierOfInterrogation(20));
                log.info("客户端:" + iecClientInfo.getTag() + " 发送总招信息");
            } catch (IOException ioException) {
                log.info("客户端:" + iecClientInfo.getTag() + " 发送总招信息失败");
            }
        }
    }

    private void iniClient(IecClientInfo iecClientInfo) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
        threadPool.scheduleAtFixedRate(() -> {
            if (!clientInfoMap.get(iecClientInfo)) {
                try {
                    destroyClient(iecClientInfo);
                    ClientConnectionBuilder clientConnectionBuilder = new ClientConnectionBuilder(iecClientInfo.getIpAddress());
                    if (iecClientInfo.getPort() != null) {
                        clientConnectionBuilder.setPort(iecClientInfo.getPort());
                    }
                    clientConnectionBuilder.setMaxTimeNoAckReceived(254000);
                    Connection clientConnection = clientConnectionBuilder.build();
                    // 发链路启动帧
                    clientConnection.startDataTransfer(new ConnectionListenerThreadImpl(iecClientInfo));
                    log.info("客户端:" + iecClientInfo.getTag() + " 连接服务成功");
                    clientConnections.put(iecClientInfo, clientConnection);
                    clientInfoMap.put(iecClientInfo, true);
                    // 发送总召信息
                    sendInterrogation(iecClientInfo);
                } catch (Exception e) {
                    log.error("客户端:" + iecClientInfo.getTag() + " iniClient 异常" + e.getMessage(), e);
                }
            }
        }, 5, 1, TimeUnit.SECONDS);
    }


    private class ConnectionListenerThreadImpl implements ConnectionEventListener {

        private IecClientInfo iecClientInfo;

        public ConnectionListenerThreadImpl(IecClientInfo iecClientInfo) {
            this.iecClientInfo = iecClientInfo;
        }

        @Override
        public void newASdu(ASdu aSdu) {
            log.info("客户端:" + iecClientInfo.getTag() + " 接收到 asdu 消息:" + aSdu.toString());
            //ThreadPoolUtil.dealUploadService.execute(() -> buildSendMsg(iecClientInfo, aSdu));
        }


        @Override
        public void connectionClosed(IOException e) {
            log.info("客户端:" + iecClientInfo.getTag() + " 连接断开" + e.getMessage(), e);
            clientInfoMap.put(iecClientInfo, false);
        }

        @Override
        public void dataTransferStateChanged(boolean b) {

        }

    }

    private void buildSendMsg(IecClientInfo iecClientInfo, ASdu aSdu) {
        List<ReportDate> reportData = reportDataService.getReportData();
        List<InformationObject> sendPoints = Lists.newArrayList();
        List<InformationObject> points = Arrays.stream(aSdu.getInformationObjects()).collect(Collectors.toList());
        String msgType = "1";
        if (CollectionUtils.isNotEmpty(reportData) && CollectionUtils.isNotEmpty(points)) {
            for (InformationObject point : points) {
                int informationObjectAddress = point.getInformationObjectAddress();
                if (aSdu.isSequenceOfElements()) {
                    // 地址有序
                    InformationElement[][] informationElements = point.getInformationElements();
                    for (int i = 0; i < informationElements.length; i++) {
                        int infoAddress = informationObjectAddress + i;
                        ReportDate reportDate = reportData.stream().filter(u -> StringUtils.equals(u.getSystemCode(), iecClientInfo.getTag()) && StringUtils.equals(u.getPointKey(), infoAddress + "")).findFirst().orElse(null);
                        if (StringUtils.isNotNull(reportDate)) {
                            msgType = reportDate.getType();
                            InformationObject newPoint = new InformationObject(Integer.parseInt(reportDate.getPointCode()), informationElements[i]);
                            sendPoints.add(newPoint);
                        }
                    }
                } else {
                    ReportDate reportDate = reportData.stream().filter(u -> StringUtils.equals(u.getSystemCode(), iecClientInfo.getTag()) && StringUtils.equals(u.getPointKey(), informationObjectAddress + "")).findFirst().orElse(null);
                    if (StringUtils.isNotNull(reportDate)) {
                        msgType = reportDate.getType();
                        InformationObject newPoint = new InformationObject(Integer.parseInt(reportDate.getPointCode()), point.getInformationElements());
                        sendPoints.add(newPoint);
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(sendPoints)) {
            InformationObject[] informationObjects = sendPoints.toArray(new InformationObject[sendPoints.size()]);
            ASdu newAdu = new ASdu(aSdu.getTypeIdentification(), false, aSdu.getCauseOfTransmission(), aSdu.isTestFrame(), aSdu.isNegativeConfirm(), 0, iecConfig.getCommonAddress(), informationObjects);
            delayMsgImpl.addToDelayQueue(new UploadMsgInfo(msgType, newAdu));
        }
    }

    private void destroyClient(IecClientInfo iecClientInfo) {
        Connection connection = clientConnections.get(iecClientInfo);
        if (StringUtils.isNotNull(connection) && !connection.isClosed()) {
            try {
                connection.stopDataTransfer();
            } catch (IOException ioException) {

            }
            connection.close();
        }
        clientConnections.remove(iecClientInfo);
    }

    @PreDestroy
    private void destroy() {
        for (Connection connection : clientConnections.values()) {
            if (StringUtils.isNotNull(connection) && !connection.isClosed()) {
                try {
                    connection.stopDataTransfer();
                } catch (IOException ioException) {

                }
                connection.close();
            }
        }
    }

}
